查看原文
其他

实战 | 将Apache Hudi数据集写入阿里云OSS

hudi ApacheHudi 2022-04-23

1. 引入

云上对象存储的廉价让不少公司将其作为主要的存储方案,而Hudi作为数据湖解决方案,支持对象存储也是必不可少。之前AWS EMR已经内置集成Hudi,也意味着可以在S3上无缝使用Hudi。当然国内用户可能更多使用阿里云OSS作为云上存储方案,那么如果用户想基于OSS构建数据湖,那么Hudi是否支持呢?随着Hudi社区主分支已经合并了支持OSS的PR,现在只需要基于master分支build版本即可,或者等待下一个版本释出便可直接使用,经过简单的配置便可将数据写入OSS。

2. 配置

2.1 pom依赖

需要额外添加的主要pom依赖如下

  1. <dependency>

  2. <groupId>org.apache.hadoop</groupId>

  3. <artifactId>hadoop-aliyun</artifactId>

  4. <version>3.2.1</version>

  5. </dependency>

  6. <dependency>

  7. <groupId>com.aliyun.oss</groupId>

  8. <artifactId>aliyun-sdk-oss</artifactId>

  9. <version>3.8.1</version>

  10. </dependency>

2.2 core-site.xml配置

若需访问OSS,需要修改core-site.xml,关键配置如下

  1. <property>

  2. <name>fs.defaultFS</name>

  3. <value>oss://bucketname/</value>

  4. </property>


  5. <property>

  6. <name>fs.oss.endpoint</name>

  7. <value>oss-endpoint-address</value>

  8. <description>Aliyun OSS endpoint to connect to.</description>

  9. </property>


  10. <property>

  11. <name>fs.oss.accessKeyId</name>

  12. <value>oss_key</value>

  13. <description>Aliyun access key ID</description>

  14. </property>


  15. <property>

  16. <name>fs.oss.accessKeySecret</name>

  17. <value>oss-secret</value>

  18. <description>Aliyun access key secret</description>

  19. </property>


  20. <property>

  21. <name>fs.oss.impl</name>

  22. <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>

  23. </property>

3. 源码

示例源码如下

  1. import org.apache.hudi.QuickstartUtils.*;

  2. import org.apache.spark.api.java.JavaSparkContext;

  3. import org.apache.spark.sql.Dataset;

  4. import org.apache.spark.sql.Row;

  5. import org.apache.spark.sql.SparkSession;


  6. import java.io.IOException;

  7. import java.util.List;


  8. import static org.apache.hudi.QuickstartUtils.convertToStringList;

  9. import static org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs;

  10. import static org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME;

  11. import static org.apache.spark.sql.SaveMode.Overwrite;


  12. public class OssHudiDemo {

  13. public static void main(String[] args) throws IOException {

  14. SparkSession spark = SparkSession.builder().appName("Hoodie Datasource test")

  15. .master("local[2]")

  16. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

  17. .config("spark.io.compression.codec", "snappy")

  18. .config("spark.sql.hive.convertMetastoreParquet", "false")

  19. .getOrCreate();

  20. JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());


  21. String tableName = "hudi_trips_cow";

  22. String basePath = "/tmp/hudi_trips_cow";

  23. DataGenerator dataGen = new DataGenerator();


  24. List<String> inserts = convertToStringList(dataGen.generateInserts(10));

  25. Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 2));

  26. df.write().format("org.apache.hudi").

  27. options(getQuickstartWriteConfigs()).

  28. option(TABLE_NAME, tableName).

  29. mode(Overwrite).

  30. save(basePath);


  31. Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(basePath + "/*/*/*");

  32. roViewDF.registerTempTable("hudi_ro_table");

  33. spark.sql("select * from hudi_ro_table").show(false);

  34. spark.stop();


  35. }

  36. }

即先写入OSS,下图可以看到OSS的Bucket中已经成功写入了数据,然后再通过spark查询写入的结果。

部分查询结果如下

  1. |20200421205942 |20200421205942_2_10 |6fd496f8-ebee-4f67-8f86-783ff3fed3ab|asia/india/chennai |1f71bed9-833b-4fca-8b4b-4cd014bdf88a-0_2-22-30_20200421205942.parquet|0.40613510977307 |0.5644092139040959 |driver-213|0.798706304941517 |0.02698359227182834|17.851135255091155|asia/india/chennai |rider-213|0.0|6fd496f8-ebee-4f67-8f86-783ff3fed3ab|

所有源代码已经上传至https://github.com/leesf/oss-hudi-demo

4. 最后

本篇文章很简单,只用作展示如何通过Hudi将数据写入OSS。当数据写入OSS后,便可打通阿里云上几乎所有产品,这使得基于阿里云技术栈进行数据湖分析将变得非常简单,比如使用DLA(Data Lake Analytics),对标AWS的Athena,对Hudi数据集进行分析查询,一体化的流程会让分析变得异常简单。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存